package com.guchenbo.flink;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.expressions.TimeIntervalUnit;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkSql2 {
    public static void main(String[] args) {
        String sql="select * from send_report";

        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql(sql);

    }

    public static Table report(Table transactions) {
        return transactions.select($("account_id"), $("transaction_time").floor(TimeIntervalUnit.SECOND).as("log_ts"),
                                        $("amount")).groupBy($("account_id"), $("log_ts"))
                        .select($("account_id"), $("log_ts"), $("amount").sum().as("amount"));
    }
}
